package com.atuguigu.flink.Day02.window;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Example3_1 {
    //TODO 2 第二中方法实现 POJO类
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);


        DataStreamSource<SendsorReading> stream = env.addSource(new SensorSource());

       stream.filter(r -> r.id.equals("sensor_1")).keyBy(
               new KeySelector<SendsorReading, String>() {
                   @Override
                   public String getKey(SendsorReading sendsorReading) throws Exception {
                       return sendsorReading.id;
                   }
               }
       ).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
               .aggregate(new MinMaxTempWithPOJO())
               .print();



        env.execute();
    }

    public static class MinMaxTempWithPOJO implements AggregateFunction<SendsorReading,Acc,MinMaxWindow> {

        @Override
        public Acc createAccumulator() {
            return new Acc("",Double.MIN_VALUE,Double.MAX_VALUE);
        }

        @Override
        public Acc add(SendsorReading sendsorReading, Acc acc) {
            return new Acc(sendsorReading.id,Math.min(sendsorReading.temperture,acc.minTemp),Math.max(sendsorReading.temperture,acc.maxTemp));
        }

        @Override
        public MinMaxWindow getResult(Acc acc) {
            return new MinMaxWindow(acc.id,acc.minTemp,acc.maxTemp);
        }

        @Override
        public Acc merge(Acc acc, Acc acc1) {
            return null;
        }
    }


    //累加器
    public  static class Acc{
        public  String id;
        public  Double minTemp;
        public  Double maxTemp;

        public Acc() {
        }

        public Acc(String id, Double minTemp, Double maxTemp) {
            this.id = id;
            this.minTemp = minTemp;
            this.maxTemp = maxTemp;
        }

        @Override
        public String toString() {
            return "Acc{" +
                    "id='" + id + '\'' +
                    ", minTemp=" + minTemp +
                    ", maxTemp=" + maxTemp +
                    '}';
        }
    }
    //输出泛型
    public static class MinMaxWindow{
        public  String id;
        public Double minTemp;
        public Double maxTemp;

        public MinMaxWindow() {
        }

        public MinMaxWindow(String id, Double minTemp, Double maxTemp) {
            this.id = id;
            this.minTemp = minTemp;
            this.maxTemp = maxTemp;
        }

        @Override
        public String toString() {
            return "MinMaxWindow{" +
                    "id='" + id + '\'' +
                    ", minTemp=" + minTemp +
                    ", maxTemp=" + maxTemp +
                    '}';
        }
    }




}
